package com.derbysoft.nuke.kafka.manager.domain.repository;

import com.derbysoft.nuke.common.module.config.ApplicationConfiguration;
import com.derbysoft.nuke.common.module.config.Config;
import com.derbysoft.nuke.common.module.json.Json;
import com.derbysoft.nuke.kafka.manager.domain.model.Cluster;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import org.apache.commons.lang3.ArrayUtils;
import org.springframework.stereotype.Repository;

import java.util.List;
import java.util.Map;

/**
 * Created by Albert Fu on 12/06/2018.
 */
@Repository
public class ClusterRepository {
    private static final String CONFIG_KEY_CLUSTERS = "clusters";

    private Map<String, Cluster> clusters = Maps.newConcurrentMap();

    public Cluster getClusterByCode(String code) {
        return clusters.get(code);
    }

    public void saveCluster(Cluster cluster) {
        clusters.put(cluster.getCode(), cluster);
        updateConfig();
    }

    public List<Cluster> getClusters() {
        return Lists.newArrayList(clusters.values());
    }


    public void deleteCluster(String code) {
        clusters.remove(code);
        updateConfig();
    }

    public void deleteClusters(String[] codes) {
        if (ArrayUtils.isEmpty(codes)) return;
        for (String code : codes) {
            clusters.remove(code);
        }
        updateConfig();
    }

    private void updateConfig() {
        ApplicationConfiguration.get().setProperty(CONFIG_KEY_CLUSTERS, Json.getDefault().toJsonString(clusters));
    }

    @Config(CONFIG_KEY_CLUSTERS)
    public void setClusters(Map<String, Map<String, Object>> clusterMap) {
        clusterMap.forEach((code, v) -> {
            Cluster cluster = Json.getDefault().toJavaObject(v, Cluster.class);
            cluster.setCode(code);
            clusters.put(code, cluster);
        });
    }
}
